package com.danan.data_collector.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/28/10:32
 * @Description:
 */
public class SinkUtil {

    public static KafkaSink<String> getKafkaSink(){
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 24 * 60 * 60 * 1000 + "");
        properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576 * 2 + "");
//        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
//        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        return KafkaSink.<String>builder()
                .setBootstrapServers(ConfigUtil.getProperty("kafka.bootstrap.server"))
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopicSelector(new TopicSelector<String>() {
                                    @Override
                                    public String apply(String s) {
                                        return String.format("ods_%s_%s",ConfigUtil.getProperty("data.source.database"),JSON.parseObject(s).getString("schema").toLowerCase());
                                    }
                                })
//                                .setTopic("data_test")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                .setKafkaProducerConfig(properties)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix(UUID.randomUUID().toString())
                .build();
    }

}
